package com.zcreate.launch
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.spark.sql.EsSparkSQL
import org.example.constant.ApolloConst
import org.example.launch.TrafficDayToES.{mysqlConfig, selectFromMysql}
import org.joda.time.DateTime

import java.text.SimpleDateFormat

import java.text.SimpleDateFormat

/**
 * 风险与报警生成，将企业统计数据,驾驶人和车辆数据实时写入ES
 */
object RiskStatistics {
  def main(args: Array[String]): Unit = {
    //1、获取ES连接
    var sparkSession: SparkSession = SparkSession.builder()
      //      .appName("EnterpriseSafetyProductionInfo")
      //      .master("local[*]")
      .config("es.nodes", ApolloConst.esNodes)
      .config("es.port", ApolloConst.esPort)
      .config("es.read.field.as.array.include", "warnList") //读取复杂的数据结构
      .config("es.mapping.date.rich", "false") //数据时间格式数据异常处理
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.crossJoin.enabled", "true")
      .config("spark.sql.caseSensitive", "true") // 设置字段区分大小写
      .getOrCreate()
    //保存到ES（企业端-风险统计-驾驶人（按天））
    EsSparkSQL.saveToEs(getDriverRiskStatisticalByDay(sparkSession),"driver_risk_statistical_day_index/_doc")
    //保存到ES（企业端-风险统计-车辆（按天））
    EsSparkSQL.saveToEs(getVehicleRiskStatisticalByDay(sparkSession),"vehicle_risk_statistical_day_index/_doc")
    //保存到ES（企业端-风险统计-企业（按天））
    EsSparkSQL.saveToEs(getEnterpriseRiskStatisticalByDay(sparkSession),"enterprise_risk_statistical_day_index/_doc")
  }

  /**
   *企业端-风险统计-驾驶人（按天）
   * @param sparkSession
   * @return
   */
  def getDriverRiskStatisticalByDay(sparkSession: SparkSession):DataFrame=
  {
    val format = new SimpleDateFormat("yyyy-MM-dd 00:00:00")
    val time = new DateTime().toString("yyyy-MM-dd 00:00:00")
    val time1 = new DateTime().plusDays(-1).toString("yyyy-MM-dd 00:00:00")
    val to: Long = format.parse(time).getTime
    val from: Long = format.parse(time1).getTime
    var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
    val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
    queryBuilder.must(QueryBuilders.rangeQuery("riskDate").gte(from).lt(to)) //添加查询规则
    builder.query(queryBuilder) //查询规则加到查询对象中
    val esQuery = builder.toString //将查询对象转为string格式
    // spark读取es的数据转为DataFrame
    val riskDetailFrame = EsSparkSQL.esDF(sparkSession, "risk_detail_index/_doc", esQuery)
    // 将查询出的风险详情表注册为临时表方便后面用sql进行统计
    riskDetailFrame.createOrReplaceTempView("driver_risk_statistical_day_index")
    val sql =
      s"""select concat(driverCode,DATE_FORMAT(riskDate,'yyyy-MM-dd')) id, --驾驶人身份证号码+日期
         |driverCode driverId,   --驾驶人身份证号码
         |driverName driverName,   --驾驶人姓名
         |driverEnterpriseCode enterpriseCode,  --企业编码
         |driverEnterprise enterpriseName,  --运输企业名称
         |0 driveRange,   --驾驶里程
         |DATE_FORMAT(riskDate,'yyyy-MM-dd') dayTime,   --日期
         |riskGradeCode,   --风险等级
         |count(new_warnList.primaryKey)  warnTotal,   --企业当日总报警数
         |count(1) riskTotal,  --驾驶员当日总风险数
         |sum(case when realFlag='0' then 1 else 0 end)  invalidRiskNum,  --无效风险次数
         |sum(case when realFlag='1' then 1 else 0 end)  validRiskNum,  --有效风险次数
         |sum(case when processStatus='0' then 1 else 0 end)  undisposedRiskNum,  --未处置风险次数
         |sum(case when riskGradeCode='1' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed1,  --风险等级编码1的未处置次数
         |sum(case when riskGradeCode='2' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed2,  --风险等级编码2的未处置次数
         |sum(case when riskGradeCode='3' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed3,  --风险等级编码3的未处置次数
         |sum(case when riskGradeCode='4' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed4,  --风险等级编码4的未处置次数
         |sum(case when riskGradeCode='1' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid1,  --风险等级编码1的有效风险次数
         |sum(case when riskGradeCode='2' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid2,  --风险等级编码2的有效风险次数
         |sum(case when riskGradeCode='3' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid3,  --风险等级编码3的有效风险次数
         |sum(case when riskGradeCode='4' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid4,  --风险等级编码4的有效风险次数
         |sum(case when riskGradeCode='1' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid1,  --风险等级编码1的无效风险次数
         |sum(case when riskGradeCode='2' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid2,  --风险等级编码2的无效风险次数
         |sum(case when riskGradeCode='3' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid3,  --风险等级编码3的无效风险次数
         |sum(case when riskGradeCode='4' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid4,  --风险等级编码4的无效风险次数
         |sum(case when riskGradeCode='1' then 1 else 0 end)  riskGradeCodeAll1,  --风险等级编码1的总风险次数
         |sum(case when riskGradeCode='2' then 1 else 0 end)  riskGradeCodeAll2,  --风险等级编码2的总风险次数
         |sum(case when riskGradeCode='3' then 1 else 0 end)  riskGradeCodeAll3,  --风险等级编码3的总风险次数
         |sum(case when riskGradeCode='4' then 1 else 0 end)  riskGradeCodeAll4,  --风险等级编码4的总风险次数
         |sum(case when riskTypeCode='1' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed1,  --风险类型编码1的未处置风险次数
         |sum(case when riskTypeCode='2' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed2,  --风险类型编码2的未处置风险次数
         |sum(case when riskTypeCode='3' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed3,  --风险类型编码3的未处置风险次数
         |sum(case when riskTypeCode='4' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed4,  --风险类型编码4的未处置风险次数
         |sum(case when riskTypeCode='1' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid1,  --风险类型编码1的有效风险次数
         |sum(case when riskTypeCode='2' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid2,  --风险类型编码2的有效风险次数
         |sum(case when riskTypeCode='3' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid3,  --风险类型编码3的有效风险次数
         |sum(case when riskTypeCode='4' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid4,  --风险类型编码4的有效风险次数
         |sum(case when riskTypeCode='1' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid1,  --风险类型编码1的无效风险次数
         |sum(case when riskTypeCode='2' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid2,  --风险类型编码2的无效风险次数
         |sum(case when riskTypeCode='3' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid3,  --风险类型编码3的无效风险次数
         |sum(case when riskTypeCode='4' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid4,  --风险类型编码4的无效风险次数
         |sum(case when riskTypeCode='1' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll1,  --风险类型编码1的总风险次数
         |sum(case when riskTypeCode='2' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll2,  --风险类型编码2的总风险次数
         |sum(case when riskTypeCode='3' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll3,  --风险类型编码3的总风险次数
         |sum(case when riskTypeCode='4' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll4  --风险类型编码4的总风险次数
         |from driver_risk_statistical_day_index lateral view explode(warnList) adTable as new_warnList
         |group by concat(driverCode,DATE_FORMAT(riskDate,'yyyy-MM-dd')),driverCode,driverName,driverEnterpriseCode,driverEnterprise,DATE_FORMAT(riskDate,'yyyy-MM-dd'),riskGradeCode""".stripMargin
    //另起列名
    val dataFrame: DataFrame = sparkSession.sql(sql)
    dataFrame.show(10,false)
    dataFrame
  }

  /**
   * 企业端-风险统计-企业统计（按天）
   *
   * @param sparkSession
   * @return
   */
  def getEnterpriseRiskStatisticalByDay(sparkSession: SparkSession): DataFrame = {
    // 风险逾期处理时长(分钟)
    val duration_value_sql =
      """
        |(select duration_value from zcov.config_risk_time_config where id = 3
        |)base_risk_time_config
      """.stripMargin

    val durationValues: Array[Long] = selectFromMysql(sparkSession, mysqlConfig, duration_value_sql)
      .collect()
      .map(_.getAs[String]("duration_value").toLong)
    // 未逾期处理风险的阈值,单位：秒
    val notOverdueThreshold: Long = durationValues(0) * 60

    val format = new SimpleDateFormat("yyyy-MM-dd 00:00:00")
    val time = new DateTime().toString("yyyy-MM-dd 00:00:00")
    val time1 = new DateTime().plusDays(-1).toString("yyyy-MM-dd 00:00:00")
    val to: Long = format.parse(time).getTime
    val from: Long = format.parse(time1).getTime
    var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
    val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
    queryBuilder.must(QueryBuilders.rangeQuery("riskDate").gte(from).lt(to)) //添加查询规则
    builder.query(queryBuilder) //查询规则加到查询对象中
    val esQuery = builder.toString //将查询对象转为string格式
    // spark读取es的数据转为DataFrame
    val riskDetailFrame = EsSparkSQL.esDF(sparkSession, "risk_detail_index/_doc", esQuery)
    // 将查询出的风险详情表注册为临时表方便后面用sql进行统计
    riskDetailFrame.createOrReplaceTempView("enterprise_risk_statistical_day_index")
    val sql = s"""select concat(vehicleEnterpriseCode,DATE_FORMAT(riskDate,'yyyy-MM-dd')) id, --企业编码+日期
                 |vehicleEnterpriseCode enterpriseCode,   --企业编码
                 |vehicleEnterprise enterpriseName,   --运输企业名称
                 |0 driveRange,   --驾驶里程
                 |DATE_FORMAT(riskDate,'yyyy-MM-dd') dayTime,   --日期
                 |riskGradeCode,   --风险等级
                 |count(new_warnList.primaryKey)  warnTotal,   --企业当日总报警数                 ????需要统计表中warnList字段里面的个数
                 |count(1) riskTotal,  --企业当日总风险数
                 |sum(case when realFlag='0' then 1 else 0 end)  invalidRiskNum,  --无效风险次数
                 |sum(case when realFlag='1' then 1 else 0 end)  validRiskNum,  --有效风险次数
                 |sum(case when processStatus='0' then 1 else 0 end)  undisposedRiskNum,  --未处置风险次数
                 |sum(case when processStatus in ('10','11') then 1 else 0 end)  riskProcessNum,  --已处理的风险总次数
                 |sum(case when processStatus in ('10','11') and unix_timestamp(processDate, 'yyyy-MM-dd HH:mm:ss') - riskDate/1000L < ${notOverdueThreshold} then 1 else 0 end)  riskTimelyNum,  --风险及时处理的总次数
                 |sum(case when processStatus in ('10','11') and unix_timestamp(processDate, 'yyyy-MM-dd HH:mm:ss') - riskDate/1000L < ${notOverdueThreshold}  and realFlag='1' then 1 else 0 end)  validRiskTimelyNum,  --风险及时处理的有效次数
                 |sum(case when processStatus in ('10','11') and unix_timestamp(processDate, 'yyyy-MM-dd HH:mm:ss') - riskDate/1000L < ${notOverdueThreshold}  and realFlag='0' then 1 else 0 end)  invalidRiskTimelyNum,  --风险及时处理的无效次数
                 |sum(case when riskGradeCode='1' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed1,  --风险等级编码1的未处置次数
                 |sum(case when riskGradeCode='2' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed2,  --风险等级编码2的未处置次数
                 |sum(case when riskGradeCode='3' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed3,  --风险等级编码3的未处置次数
                 |sum(case when riskGradeCode='4' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed4,  --风险等级编码4的未处置次数
                 |sum(case when riskGradeCode='1' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid1,  --风险等级编码1的有效风险次数
                 |sum(case when riskGradeCode='2' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid2,  --风险等级编码2的有效风险次数
                 |sum(case when riskGradeCode='3' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid3,  --风险等级编码3的有效风险次数
                 |sum(case when riskGradeCode='4' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid4,  --风险等级编码4的有效风险次数
                 |sum(case when riskGradeCode='1' and realFlag='0' then 1 else 0 end)  riskGradeCodeInValid1,  --风险等级编码1的无效风险次数
                 |sum(case when riskGradeCode='2' and realFlag='0' then 1 else 0 end)  riskGradeCodeInValid2,  --风险等级编码2的无效风险次数
                 |sum(case when riskGradeCode='3' and realFlag='0' then 1 else 0 end)  riskGradeCodeInValid3,  --风险等级编码3的无效风险次数
                 |sum(case when riskGradeCode='4' and realFlag='0' then 1 else 0 end)  riskGradeCodeInValid4,  --风险等级编码4的无效风险次数
                 |sum(case when riskGradeCode='1' then 1 else 0 end)  riskGradeCodeAll1,  --风险等级编码1的总风险次数
                 |sum(case when riskGradeCode='2' then 1 else 0 end)  riskGradeCodeAll2,  --风险等级编码2的总风险次数
                 |sum(case when riskGradeCode='3' then 1 else 0 end)  riskGradeCodeAll3,  --风险等级编码3的总风险次数
                 |sum(case when riskGradeCode='4' then 1 else 0 end)  riskGradeCodeAll4,  --风险等级编码4的总风险次数
                 |sum(case when riskTypeCode='1' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed1,  --风险类型编码1的未处置风险次数
                 |sum(case when riskTypeCode='2' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed2,  --风险类型编码2的未处置风险次数
                 |sum(case when riskTypeCode='3' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed3,  --风险类型编码3的未处置风险次数
                 |sum(case when riskTypeCode='4' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed4,  --风险类型编码4的未处置风险次数
                 |sum(case when riskTypeCode='1' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid1,  --风险类型编码1的有效风险次数
                 |sum(case when riskTypeCode='2' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid2,  --风险类型编码2的有效风险次数
                 |sum(case when riskTypeCode='3' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid3,  --风险类型编码3的有效风险次数
                 |sum(case when riskTypeCode='4' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid4,  --风险类型编码4的有效风险次数
                 |sum(case when riskTypeCode='1' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid1,  --风险类型编码1的无效风险次数
                 |sum(case when riskTypeCode='2' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid2,  --风险类型编码2的无效风险次数
                 |sum(case when riskTypeCode='3' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid3,  --风险类型编码3的无效风险次数
                 |sum(case when riskTypeCode='4' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid4,  --风险类型编码4的无效风险次数
                 |sum(case when riskTypeCode='1' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll1,  --风险类型编码1的总风险次数
                 |sum(case when riskTypeCode='2' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll2,  --风险类型编码2的总风险次数
                 |sum(case when riskTypeCode='3' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll3,  --风险类型编码3的总风险次数
                 |sum(case when riskTypeCode='4' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll4  --风险类型编码4的总风险次数
                 |from enterprise_risk_statistical_day_index lateral view explode(warnList) adTable as new_warnList
                 |group by concat(vehicleEnterpriseCode,DATE_FORMAT(riskDate,'yyyy-MM-dd')),vehicleEnterpriseCode,vehicleEnterprise,DATE_FORMAT(riskDate,'yyyy-MM-dd'),riskGradeCode""".stripMargin
    val dataFrame: DataFrame = sparkSession.sql(sql)
    dataFrame.show(10,false)
    dataFrame
  }

  /**
   * 企业端-风险统计-车辆（按天）
   *
   * @param sparkSession
   * @return
   */
  def getVehicleRiskStatisticalByDay(sparkSession: SparkSession): DataFrame = {
    val format = new SimpleDateFormat("yyyy-MM-dd 00:00:00")
    val time = new DateTime().toString("yyyy-MM-dd 00:00:00")
    val time1 = new DateTime().plusDays(-1).toString("yyyy-MM-dd 00:00:00")
    val to: Long = format.parse(time).getTime
    val from: Long = format.parse(time1).getTime
    var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
    val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
    queryBuilder.must(QueryBuilders.rangeQuery("riskDate").gte(from).lt(to)) //添加查询规则
    builder.query(queryBuilder) //查询规则加到查询对象中
    val esQuery = builder.toString //将查询对象转为string格式
    // spark读取es的数据转为DataFrame
    val riskDetailFrame = EsSparkSQL.esDF(sparkSession, "risk_detail_index/_doc", esQuery)
    // 将查询出的风险详情表注册为临时表方便后面用sql进行统计
    riskDetailFrame.createOrReplaceTempView("vehicle_risk_statistical_day_index")
    val sql = s"""select concat(concat(vehicleNo,vehicleColor),DATE_FORMAT(riskDate,'yyyy-MM-dd')) id, --车牌号+车牌颜色+日期
                 |driverEnterpriseCode enterpriseCode,  --企业编码
                 |driverEnterprise enterpriseName,  --运输企业名称
                 |vehicleNo,   --车牌号
                 |vehicleColor,   --车牌颜色
                 |concat(concat(vehicleNo,"_"),vehicleColor) vehicleCode,  --车牌号和车牌颜色组合key(e:车牌号+下划线+车牌颜色)
                 |useNature,  --营运性质
                 |'' businessScope,  --经营范围
                 |'' businessScopeDetail,  --经营范围详细
                 |0 driveRange,   --驾驶里程
                 |DATE_FORMAT(riskDate,'yyyy-MM-dd') dayTime,   --日期
                 |riskGradeCode,   --风险等级
                 |count(new_warnList.primaryKey)  warnTotal,   --企业当日总报警数
                 |count(1) riskTotal,  --车辆当日总风险数
                 |sum(case when realFlag='0' then 1 else 0 end)  invalidRiskNum,  --无效风险次数
                 |sum(case when realFlag='1' then 1 else 0 end)  validRiskNum,  --有效风险次数
                 |sum(case when processStatus='0' then 1 else 0 end)  undisposedRiskNum,  --未处置风险次数
                 |sum(case when riskGradeCode='1' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed1,  --风险等级编码1的未处置次数
                 |sum(case when riskGradeCode='2' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed2,  --风险等级编码2的未处置次数
                 |sum(case when riskGradeCode='3' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed3,  --风险等级编码3的未处置次数
                 |sum(case when riskGradeCode='4' and processDate is null then 1 else 0 end)  riskGradeCodeUndisposed4,  --风险等级编码4的未处置次数
                 |sum(case when riskGradeCode='1' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid1,  --风险等级编码1的有效风险次数
                 |sum(case when riskGradeCode='2' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid2,  --风险等级编码2的有效风险次数
                 |sum(case when riskGradeCode='3' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid3,  --风险等级编码3的有效风险次数
                 |sum(case when riskGradeCode='4' and realFlag='1' then 1 else 0 end)  riskGradeCodeValid4,  --风险等级编码4的有效风险次数
                 |sum(case when riskGradeCode='1' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid1,  --风险等级编码1的无效风险次数
                 |sum(case when riskGradeCode='2' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid2,  --风险等级编码2的无效风险次数
                 |sum(case when riskGradeCode='3' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid3,  --风险等级编码3的无效风险次数
                 |sum(case when riskGradeCode='4' and realFlag='0' then 1 else 0 end)  riskGradeCodeInvalid4,  --风险等级编码4的无效风险次数
                 |sum(case when riskGradeCode='1' then 1 else 0 end)  riskGradeCodeAll1,  --风险等级编码1的总风险次数
                 |sum(case when riskGradeCode='2' then 1 else 0 end)  riskGradeCodeAll2,  --风险等级编码2的总风险次数
                 |sum(case when riskGradeCode='3' then 1 else 0 end)  riskGradeCodeAll3,  --风险等级编码3的总风险次数
                 |sum(case when riskGradeCode='4' then 1 else 0 end)  riskGradeCodeAll4,  --风险等级编码4的总风险次数
                 |sum(case when riskTypeCode='1' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed1,  --风险类型编码1的未处置风险次数
                 |sum(case when riskTypeCode='2' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed2,  --风险类型编码2的未处置风险次数
                 |sum(case when riskTypeCode='3' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed3,  --风险类型编码3的未处置风险次数
                 |sum(case when riskTypeCode='4' and processDate is null then 1 else 0 end)  riskTypeCodeUndisposed4,  --风险类型编码4的未处置风险次数
                 |sum(case when riskTypeCode='1' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid1,  --风险类型编码1的有效风险次数
                 |sum(case when riskTypeCode='2' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid2,  --风险类型编码2的有效风险次数
                 |sum(case when riskTypeCode='3' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid3,  --风险类型编码3的有效风险次数
                 |sum(case when riskTypeCode='4' and realFlag='1' then 1 else 0 end)  riskTypeCodeValid4,  --风险类型编码4的有效风险次数
                 |sum(case when riskTypeCode='1' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid1,  --风险类型编码1的无效风险次数
                 |sum(case when riskTypeCode='2' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid2,  --风险类型编码2的无效风险次数
                 |sum(case when riskTypeCode='3' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid3,  --风险类型编码3的无效风险次数
                 |sum(case when riskTypeCode='4' and realFlag='0' then 1 else 0 end)  riskTypeCodeInValid4,  --风险类型编码4的无效风险次数
                 |sum(case when riskTypeCode='1' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll1,  --风险类型编码1的总风险次数
                 |sum(case when riskTypeCode='2' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll2,  --风险类型编码2的总风险次数
                 |sum(case when riskTypeCode='3' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll3,  --风险类型编码3的总风险次数
                 |sum(case when riskTypeCode='4' and realFlag='0' then 1 else 0 end)  riskTypeCodeAll4  --风险类型编码4的总风险次数
                 |from vehicle_risk_statistical_day_index lateral view explode(warnList) adTable as new_warnList
                 |group by concat(concat(vehicleNo,vehicleColor),DATE_FORMAT(riskDate,'yyyy-MM-dd')),driverEnterpriseCode,driverEnterprise,vehicleNo,vehicleColor,concat(concat(vehicleNo,"_"),vehicleColor),
                 |useNature,DATE_FORMAT(riskDate,'yyyy-MM-dd'),riskGradeCode""".stripMargin
    val dataFrame: DataFrame = sparkSession.sql(sql)
    dataFrame.show(10,false)
    dataFrame
  }
}

